vim kafka_cars.conf

a.sources = r1
a.sinks = k1
a.channels = c1

#指定spooldir的属性
a.sources.r1.type = spooldir
a.sources.r1.spoolDir = /root/data # 数据所在目录
a.sources.r1.fileHeader = true
a.sources.r1.interceptors = i1
a.sources.r1.interceptors.i1.type = timestamp


#设置Kafka接收器
a.sinks.k1.type=org.apache.flume.sink.kafka.KafkaSink
#设置Kafka的broker地址和端口号
a.sinks.k1.brokerList=master:9092

#设置Kafka的Topic   如果topic不存在会自动创建一个topic,默认分区为1，副本为1
a.sinks.k1.topic=kafka_cars

#设置序列化方式
a.sinks.k1.serializer.class=kafka.serializer.StringEncoder


#指定channel
a.channels.c1.type = memory
a.channels.c1.capacity = 1000
a.channels.c1.transactionCapacity = 100 # 事物容量
# a.channels.c1.keep-alive = 30 # 等待时间

 #组装
a.sources.r1.channels = c1
a.sinks.k1.channel = c1


# 启动
nohup flume-ng agent -n a -f ./kafka_cars.conf -Dflume.root.logger=DEBUG,console >> kafka_cars.log 2>&1 &

kafka-console-consumer.sh --bootstrap-server master:9092 --from-beginning --topic kafka_cars